package com.yyq.bigexport.util.insert;

import cn.hutool.core.thread.NamedThreadFactory;
import com.yyq.bigexport.service.UserService;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Slf4j
public class InsertUtils {

    /**
     * 新建20个线程，定义每个线程插入50000条数据
     */
    private static int batchSize = 50000;
    private final static int corePoolSize = 2;
    private final static int maxPoolSize = 3;
    //定义插入线程池
    private static ThreadPoolExecutor executorService = new ThreadPoolExecutor(corePoolSize,
            maxPoolSize,
            30,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(),
            new NamedThreadFactory("batch-pool-", false));
    public static void insert(UserService userService, int totalRecords) {
        //计算插入的批次数
        int batchCount = (totalRecords + batchSize - 1) / batchSize;

        log.info("任务分析：总记录-{}, 每次插入数量-{}, 批次数-{}", totalRecords, batchSize, batchCount);

        //创建计数器
        CountDownLatch countDownLatch = new CountDownLatch(batchCount);

        try {
            long currTime = System.currentTimeMillis();

            for (int i = 1; i <= batchCount; i++) {
                executorService.submit(new InsertTask(
                        userService,
                        batchSize,
                        countDownLatch
                ));
            }
            //等待所有线程处理完成
            countDownLatch.await();
            log.info("所有线程完成，插入数据总量：{}，共耗时：{}s!", totalRecords, (System.currentTimeMillis() - currTime) / 1000);
        } catch (Exception e) {
            log.error("插入异常：{}", e.getMessage(), e);
        }
    }
}
